feat: 2-3x faster trace dataset loading via HashIdRandomGenerator#724
feat: 2-3x faster trace dataset loading via HashIdRandomGenerator#724ajcasagrande wants to merge 10 commits intomainfrom
Conversation
Add support for Alibaba Bailian trace format (bailian_trace) with multi-turn conversation linking via chat_id/parent_chat_id and 16-token SipHash blocks. Extract common trace loading logic into BaseTraceDatasetLoader to share infrastructure between Mooncake and Bailian loaders. Make block_size configurable per-loader via plugin metadata default_block_size, and generalize mooncake-specific validation to work with any trace dataset type. Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
- Remove .value on enum in error message (use string-based enum directly) - Validate mean is not None before cached prompt generation - Add cycle detection in Bailian find_root to prevent infinite loops - Reset filtering counters per load_dataset() call to avoid over-reporting Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Try out this PRQuick install: pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@766253cc79dadf4535b6e294ff176feac8dc437aRecommended with virtual environment (using uv): uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@766253cc79dadf4535b6e294ff176feac8dc437aLast updated for commit: |
Tokenizers like Kimi use `allow_special_tokens` instead of the standard `add_special_tokens` for encode, and their `decode()` doesn't accept `skip_special_tokens`. Passing unsupported kwargs triggers the slow `PreTrainedTokenizer.super()` fallback path, causing ~5000x slower decode (~204ms vs 0.04ms per 4500 tokens). After loading, inspect the tokenizer's method signatures and override the default call/encode/decode args to match. Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Gracefully handle unrecognized 'type' field values during dataset type inference by skipping the explicit type shortcut and falling through to structural detection. This fixes Bailian traces (which use "type" for request type, not dataset type) auto-detecting correctly. Also updates CLI descriptions to reference both trace formats and adds tests for the type field fallback behavior. Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
eb9491c to
3617fdb
Compare
Add HashIdRandomGenerator that deterministically seeds per (trace_id, hash_id) pair, enabling parallel token generation across workers without lock contention or cache coordination. Extract parallel_convert module to leverage multiprocessing with shared-memory token corpus. Stream conversations through composers to the backing store instead of materializing the full dataset in memory. Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
WalkthroughThis pull request introduces a deterministic hash-based random generator, converts dataset composition from batch-based to streaming iterators, replaces parallel token decoding with parallel trace-to-conversation conversion, and adds tokenizer keyword argument override detection. Over thirty files are modified with comprehensive test coverage. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
parallel_decode and _init_worker were not forwarding tokenizer config args, causing failures with tokenizers like Kimi that require trust_remote_code=True. Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
…/even-faster-trace-datasets
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/aiperf/dataset/protocols.py (1)
20-20:⚠️ Potential issue | 🟡 MinorAlign protocol documentation with the new iterable return contract.
At Line 20, the protocol docstring still says “list of Conversation objects,” but the method now returns
Iterable[Conversation]. That mismatch can cause incorrect caller assumptions (e.g.,len()/indexing always available).Proposed doc fix
class CustomDatasetLoaderProtocol(Protocol): - """Protocol for custom dataset loaders that load dataset from a file and convert it to a list of Conversation objects.""" + """Protocol for custom dataset loaders that load data from a file and convert it to an iterable of Conversation objects. + + Implementations may return either a materialized list or a single-pass iterator. + """Also applies to: 49-51
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/aiperf/dataset/protocols.py` at line 20, Update the protocol docstrings to reflect the new iterable return contract: change any wording that says “list of Conversation objects” to “Iterable[Conversation]” and explicitly note that callers should not assume list ops like len() or indexing are available; update the main protocol docstring (the one starting “Protocol for custom dataset loaders...”) and the related docblock referenced around lines 49–51 to match this wording and mention that callers should consume the returned Iterable via iteration or by materializing it into a list if random access is required.
🧹 Nitpick comments (4)
tests/unit/dataset/loader/test_parallel_convert.py (1)
481-522: Remove unusedtmp_pathfixture parameter.The
tmp_pathparameter is not used in this test. The test creates its own shared memory and doesn't need a temporary directory.🧹 Remove unused parameter
- def test_init_worker_sets_up_state(self, sample_corpus_array, tmp_path): + def test_init_worker_sets_up_state(self, sample_corpus_array): """_init_worker should populate _worker dict with all required fields."""🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/dataset/loader/test_parallel_convert.py` around lines 481 - 522, The test test_init_worker_sets_up_state contains an unused fixture parameter tmp_path; remove tmp_path from the test signature so the test becomes def test_init_worker_sets_up_state(self, sample_corpus_array): and update any references if present, leaving the body that creates shared_memory, constructs _WorkerInitArgs, calls _init_worker, and asserts on parallel_convert_mod._worker_state unchanged; ensure the test imports/mocks (e.g., Tokenizer.from_pretrained) and the cleanup resetting parallel_convert_mod._worker_state and closing/unlinking shm remain intact.tests/unit/dataset/test_dataset_manager.py (1)
465-466: Consider completing the verification comments.The comments "Verify in-memory dataset was not materialized" on lines 465-466 and 490-491 appear to be placeholders without actual assertions. If verification is no longer needed (since the in-memory dataset no longer exists), consider removing these comments or adding actual assertions if verification is intended.
🧹 Remove incomplete verification comments
- # Verify in-memory dataset was not materialized - # Request should still work via dataset clientAlso applies to: 490-491
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/dataset/test_dataset_manager.py` around lines 465 - 466, Locate the two placeholder comments "Verify in-memory dataset was not materialized" in tests/unit/dataset/test_dataset_manager.py and either remove them or replace them with concrete assertions: call the test's DatasetManager instance (e.g., dataset_manager) and assert the dataset is not marked materialized (e.g., assert not dataset_manager.is_materialized(dataset_id)) or assert no materialized file/path exists on disk (e.g., assert not os.path.exists(materialized_path)); if the project uses an internal map (e.g., dataset_manager._materialized or dataset_manager.materialized_datasets), assert the dataset_id is not present there instead. Ensure the chosen assertion uses existing public methods/attributes in the test context (materialize_dataset, is_materialized, materialized_datasets, or filesystem path helpers) so the verification is real and not just a comment.tests/unit/common/test_hash_id_random_generator.py (1)
15-22: Optional cleanup: de-duplicate RNG setup fixture.Both test classes use the same autouse RNG reset/init fixture; extracting one module-level fixture would keep this file a bit leaner.
Also applies to: 215-222
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/common/test_hash_id_random_generator.py` around lines 15 - 22, The RNG setup/reset fixture is duplicated across test classes; keep a single module-level autouse fixture instead: define the existing setup_rng fixture once at the top of the test module (using `@pytest.fixture`(autouse=True)) that calls rng.reset(), rng.init(42), yield, then rng.reset(), and remove the duplicate fixture definitions (e.g., the other occurrence around lines 215-222); keep references to the same symbols (setup_rng, rng.reset, rng.init) so all tests get the shared setup.tests/unit/dataset/loader/test_base_trace_loader.py (1)
243-348: Refactor threshold dispatch tests into one parameterized test.These tests are data-driven variants of the same assertion pattern. Consolidating with
@pytest.mark.parametrizewill reduce duplication and keep boundary coverage clearer.Possible consolidation
+ `@pytest.mark.parametrize`( + ("trace_count", "expect_parallel"), + [ + (_MIN_TRACES_FOR_PARALLEL + 1, True), + (_MIN_TRACES_FOR_PARALLEL, True), + (_MIN_TRACES_FOR_PARALLEL - 1, False), + ], + ) + def test_threshold_dispatch_behavior( + self, + create_jsonl_file, + mock_prompt_generator, + default_user_config, + trace_count, + expect_parallel, + ): + content = [ + f'{{"input_length": 100, "hash_ids": [{i}], "timestamp": {i * 1000}}}' + for i in range(trace_count) + ] + filename = create_jsonl_file(content) + loader = MooncakeTraceDatasetLoader( + filename=filename, + user_config=default_user_config, + prompt_generator=mock_prompt_generator, + ) + data = loader.load_dataset() + with patch("aiperf.dataset.loader.base_trace_loader.parallel_convert", return_value=[]) as mock_parallel: + list(loader.convert_to_conversations(data)) + if expect_parallel: + mock_parallel.assert_called_once() + else: + mock_parallel.assert_not_called()As per coding guidelines, "Use
@pytest.mark.parametrizefor data-driven tests".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/dataset/loader/test_base_trace_loader.py` around lines 243 - 348, Tests test_large_dataset_uses_parallel, test_exactly_threshold_uses_parallel, and test_below_threshold_uses_single_threaded are duplicate threshold variations and should be consolidated into one pytest.mark.parametrize test; create a single test (e.g., test_threshold_dispatch_parametrized) that takes a trace_count and expected_parallel boolean, build content using _MIN_TRACES_FOR_PARALLEL +/- values, instantiate MooncakeTraceDatasetLoader as before, call loader.load_dataset() and then patch aiperf.dataset.loader.base_trace_loader.parallel_convert and call list(loader.convert_to_conversations(data)), finally assert mock_parallel.assert_called_once() when expected_parallel is True or mock_parallel.assert_not_called() when False; keep _MIN_TRACES_FOR_PARALLEL, MooncakeTraceDatasetLoader, convert_to_conversations, and parallel_convert references to locate code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/aiperf/common/hash_id_random_generator.py`:
- Around line 46-49: The truthiness check in HashIdRandomGenerator.from_base_rng
incorrectly treats seed 0 as falsy; change the logic to preserve an explicit
seed of 0 by testing for None instead of using `or`. Specifically, in
from_base_rng use a None check on base_rng.seed (e.g., base_seed = base_rng.seed
if base_rng.seed is not None else base_rng.randrange(...)) so that a seed value
of 0 is kept; this references the from_base_rng method and the base_rng.seed
attribute on RandomGenerator.
In `@src/aiperf/common/tokenizer.py`:
- Around line 51-56: The helper _supports_kwarg currently calls
inspect.signature(method) without guarding for introspection failures; update
_supports_kwarg to catch exceptions like ValueError and TypeError raised by
inspect.signature and return False in those cases so non-introspectable
tokenizer methods are treated the same as missing methods (i.e., no kwarg
support). Locate the _supports_kwarg function and wrap the
inspect.signature(method).parameters access in a try/except that returns False
on ValueError/TypeError, keeping existing behavior for missing methods.
In `@src/aiperf/dataset/dataset_manager.py`:
- Around line 159-160: The Kubernetes-mode log message in dataset_manager.py
builds an f-string containing self._conversation_count and currently ends with
an extra closing parenthesis in the literal "backing store)"; update the string
used in the logging call (the f"...Kubernetes mode: skipped local client,
compressed {self._conversation_count} conversations into backing store)") to
remove the stray ')' so the message reads without the extra parenthesis.
In `@src/aiperf/dataset/generator/prompt.py`:
- Around line 289-299: The cache key uses only hash_id which breaks when
current_block_size varies; change the caching to include current_block_size
(e.g., use a tuple key like (hash_id, current_block_size)) so that self._cache
stores size-specific samples; update any annotations for self._cache
accordingly, ensure you still call
self._hash_id_corpus_rng.reseed_for_hash_id(hash_id) and populate the cache
using sample_tokens_from_corpus(self._tokenized_corpus, current_block_size,
self._hash_id_corpus_rng, self.tokenizer.block_separation_token_id), and
continue to extend final_prompt with the cached value.
In `@src/aiperf/dataset/loader/parallel_convert.py`:
- Around line 138-146: Validate that final_block_size is positive before using
it: compute final_block_size as currently done (in the block handling hash_ids
inside parallel_convert.py), then check that input_length >= (len(hash_ids) - 1)
* block_size (or equivalently final_block_size > 0) and raise a clear ValueError
(or return/skip the trace) if the check fails; reference the variables hash_ids,
input_length, final_block_size and the functions get_block_tokens/decode so
maintainers can locate where to add the validation and the error message
describing the invalid sizes (include values for input_length, len(hash_ids),
and block_size in the error).
In `@tests/unit/dataset/loader/test_base_trace_loader.py`:
- Around line 38-72: Add explicit type annotations to all untyped fixtures and
test functions in this file: annotate create_jsonl_file with its
return/generator type (e.g., Iterator[Callable[[List[str]], str]]), mark
default_user_config() -> UserConfig, mock_prompt_generator() -> Mock (or
PromptGenerator protocol), and add -> None return annotations to all test_*
functions and typed parameters where applicable; update any local variables'
types if needed to satisfy type checkers (references: create_jsonl_file,
default_user_config, mock_prompt_generator, and all test_* functions).
- Around line 422-442: The test test_turn_fields_populated is missing an
assertion for the turn.delay field; update the test to assert that turn.delay
equals the expected value produced by MooncakeTraceDatasetLoader (i.e., the
delay mapping implemented in MooncakeTraceDatasetLoader.convert_to_conversations
or loader.load_dataset), for example add a line like assert turn.delay ==
<expected_delay> where <expected_delay> is the loader's computed/default delay
for the provided record (commonly 0 if no delay field is present).
---
Outside diff comments:
In `@src/aiperf/dataset/protocols.py`:
- Line 20: Update the protocol docstrings to reflect the new iterable return
contract: change any wording that says “list of Conversation objects” to
“Iterable[Conversation]” and explicitly note that callers should not assume list
ops like len() or indexing are available; update the main protocol docstring
(the one starting “Protocol for custom dataset loaders...”) and the related
docblock referenced around lines 49–51 to match this wording and mention that
callers should consume the returned Iterable via iteration or by materializing
it into a list if random access is required.
---
Nitpick comments:
In `@tests/unit/common/test_hash_id_random_generator.py`:
- Around line 15-22: The RNG setup/reset fixture is duplicated across test
classes; keep a single module-level autouse fixture instead: define the existing
setup_rng fixture once at the top of the test module (using
`@pytest.fixture`(autouse=True)) that calls rng.reset(), rng.init(42), yield, then
rng.reset(), and remove the duplicate fixture definitions (e.g., the other
occurrence around lines 215-222); keep references to the same symbols
(setup_rng, rng.reset, rng.init) so all tests get the shared setup.
In `@tests/unit/dataset/loader/test_base_trace_loader.py`:
- Around line 243-348: Tests test_large_dataset_uses_parallel,
test_exactly_threshold_uses_parallel, and
test_below_threshold_uses_single_threaded are duplicate threshold variations and
should be consolidated into one pytest.mark.parametrize test; create a single
test (e.g., test_threshold_dispatch_parametrized) that takes a trace_count and
expected_parallel boolean, build content using _MIN_TRACES_FOR_PARALLEL +/-
values, instantiate MooncakeTraceDatasetLoader as before, call
loader.load_dataset() and then patch
aiperf.dataset.loader.base_trace_loader.parallel_convert and call
list(loader.convert_to_conversations(data)), finally assert
mock_parallel.assert_called_once() when expected_parallel is True or
mock_parallel.assert_not_called() when False; keep _MIN_TRACES_FOR_PARALLEL,
MooncakeTraceDatasetLoader, convert_to_conversations, and parallel_convert
references to locate code.
In `@tests/unit/dataset/loader/test_parallel_convert.py`:
- Around line 481-522: The test test_init_worker_sets_up_state contains an
unused fixture parameter tmp_path; remove tmp_path from the test signature so
the test becomes def test_init_worker_sets_up_state(self, sample_corpus_array):
and update any references if present, leaving the body that creates
shared_memory, constructs _WorkerInitArgs, calls _init_worker, and asserts on
parallel_convert_mod._worker_state unchanged; ensure the test imports/mocks
(e.g., Tokenizer.from_pretrained) and the cleanup resetting
parallel_convert_mod._worker_state and closing/unlinking shm remain intact.
In `@tests/unit/dataset/test_dataset_manager.py`:
- Around line 465-466: Locate the two placeholder comments "Verify in-memory
dataset was not materialized" in tests/unit/dataset/test_dataset_manager.py and
either remove them or replace them with concrete assertions: call the test's
DatasetManager instance (e.g., dataset_manager) and assert the dataset is not
marked materialized (e.g., assert not
dataset_manager.is_materialized(dataset_id)) or assert no materialized file/path
exists on disk (e.g., assert not os.path.exists(materialized_path)); if the
project uses an internal map (e.g., dataset_manager._materialized or
dataset_manager.materialized_datasets), assert the dataset_id is not present
there instead. Ensure the chosen assertion uses existing public
methods/attributes in the test context (materialize_dataset, is_materialized,
materialized_datasets, or filesystem path helpers) so the verification is real
and not just a comment.
ℹ️ Review info
Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: f5b177a5-4d79-4a6e-ae16-e09df52a497b
📒 Files selected for processing (28)
src/aiperf/common/hash_id_random_generator.pysrc/aiperf/common/tokenizer.pysrc/aiperf/dataset/composer/base.pysrc/aiperf/dataset/composer/custom.pysrc/aiperf/dataset/composer/synthetic.pysrc/aiperf/dataset/composer/synthetic_rankings.pysrc/aiperf/dataset/dataset_manager.pysrc/aiperf/dataset/generator/parallel_decode.pysrc/aiperf/dataset/generator/prompt.pysrc/aiperf/dataset/loader/base_trace_loader.pysrc/aiperf/dataset/loader/parallel_convert.pysrc/aiperf/dataset/protocols.pysrc/aiperf/dataset/synthesis/rolling_hasher.pytests/unit/common/test_hash_id_random_generator.pytests/unit/common/test_tokenizer_kwarg_overrides.pytests/unit/dataset/composer/test_base_composer.pytests/unit/dataset/composer/test_custom_composer.pytests/unit/dataset/composer/test_synthetic_composer.pytests/unit/dataset/composer/test_synthetic_rankings_composer.pytests/unit/dataset/conftest.pytests/unit/dataset/generator/test_parallel_decode.pytests/unit/dataset/generator/test_prompt_generator.pytests/unit/dataset/loader/test_bailian_trace.pytests/unit/dataset/loader/test_base_trace_loader.pytests/unit/dataset/loader/test_parallel_convert.pytests/unit/dataset/loader/test_trace.pytests/unit/dataset/test_dataset_manager.pytests/unit/dataset/test_dataset_manager_inputs_json.py
💤 Files with no reviewable changes (2)
- src/aiperf/dataset/generator/parallel_decode.py
- tests/unit/dataset/generator/test_parallel_decode.py
| def from_base_rng(cls, base_rng: RandomGenerator) -> "HashIdRandomGenerator": | ||
| """Create from a base RandomGenerator (typically from rng.derive()).""" | ||
| base_seed = base_rng.seed or base_rng.randrange(0, 2**64) | ||
| return cls(base_seed, _internal=True) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Read the file to see the actual code at lines 46-49
cat -n src/aiperf/common/hash_id_random_generator.py | head -60 | tail -20Repository: ai-dynamo/aiperf
Length of output: 1015
🏁 Script executed:
# Find the RandomGenerator class definition to understand the seed attribute
rg "class RandomGenerator" -A 20Repository: ai-dynamo/aiperf
Length of output: 1810
🏁 Script executed:
# Search for how seed is initialized and used
rg "seed" src/aiperf/common/hash_id_random_generator.py -B 2 -A 2Repository: ai-dynamo/aiperf
Length of output: 1290
🏁 Script executed:
# Check the RandomGenerator class definition fully, including seed property/attribute
cat -n src/aiperf/common/random_generator.py | head -50Repository: ai-dynamo/aiperf
Length of output: 2245
🏁 Script executed:
# Check if seed is a property or stored attribute
rg "self.seed" src/aiperf/common/random_generator.py -B 2 -A 2Repository: ai-dynamo/aiperf
Length of output: 42
🏁 Script executed:
# Check if there are any tests that show intended behavior with seed=0
rg "seed.*=.*0" src/ -t py --max-count 20Repository: ai-dynamo/aiperf
Length of output: 262
🏁 Script executed:
# Get full RandomGenerator class definition
cat -n src/aiperf/common/random_generator.py | head -150Repository: ai-dynamo/aiperf
Length of output: 6423
🏁 Script executed:
# Check how seed attribute is defined and stored in RandomGenerator
rg "self\.seed\s*=" src/aiperf/common/random_generator.py -B 5 -A 5Repository: ai-dynamo/aiperf
Length of output: 42
🏁 Script executed:
# Look for any usage of from_base_rng to understand intended behavior
rg "from_base_rng" --max-count 10Repository: ai-dynamo/aiperf
Length of output: 1499
🏁 Script executed:
# Check the tests to see if any test seed=0 behavior
cat -n tests/unit/common/test_hash_id_random_generator.py | head -100Repository: ai-dynamo/aiperf
Length of output: 4665
🏁 Script executed:
# Look for how from_base_rng is called in real code
cat -n src/aiperf/dataset/generator/prompt.py | grep -A 5 -B 5 "from_base_rng"Repository: ai-dynamo/aiperf
Length of output: 680
Fix truthiness check to preserve explicit seed 0.
Line 48 uses or to evaluate seed, which treats 0 as falsy despite being a valid seed documented in the constructor. This breaks reproducibility when seed is intentionally set to 0.
Proposed fix
- base_seed = base_rng.seed or base_rng.randrange(0, 2**64)
+ base_seed = (
+ base_rng.seed if base_rng.seed is not None else base_rng.randrange(0, 2**64)
+ )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/aiperf/common/hash_id_random_generator.py` around lines 46 - 49, The
truthiness check in HashIdRandomGenerator.from_base_rng incorrectly treats seed
0 as falsy; change the logic to preserve an explicit seed of 0 by testing for
None instead of using `or`. Specifically, in from_base_rng use a None check on
base_rng.seed (e.g., base_seed = base_rng.seed if base_rng.seed is not None else
base_rng.randrange(...)) so that a seed value of 0 is kept; this references the
from_base_rng method and the base_rng.seed attribute on RandomGenerator.
| def _supports_kwarg(obj: object, method_name: str, kwarg: str) -> bool: | ||
| """Check if a method on an object accepts a specific keyword argument.""" | ||
| method = getattr(obj, method_name, None) | ||
| if method is None: | ||
| return False | ||
| return kwarg in inspect.signature(method).parameters |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd /root && find . -name "tokenizer.py" -path "*/aiperf/*" | head -5Repository: ai-dynamo/aiperf
Length of output: 115
🏁 Script executed:
cat -n src/aiperf/common/tokenizer.py | head -70Repository: ai-dynamo/aiperf
Length of output: 2839
🏁 Script executed:
cat -n src/aiperf/common/tokenizer.py | sed -n '51,70p'Repository: ai-dynamo/aiperf
Length of output: 984
🏁 Script executed:
# Check usage of _supports_kwarg
rg "_supports_kwarg" --type py -B 2 -A 2Repository: ai-dynamo/aiperf
Length of output: 5955
🏁 Script executed:
# Check usage of _apply_kwarg_overrides
rg "_apply_kwarg_overrides" --type py -B 2 -A 2Repository: ai-dynamo/aiperf
Length of output: 3710
🏁 Script executed:
# Look for tokenizer instantiation and from_pretrained usage
rg "from_pretrained" src/aiperf/common/tokenizer.py -B 2 -A 2Repository: ai-dynamo/aiperf
Length of output: 1313
Add error handling in _supports_kwarg for non-introspectable tokenizer methods.
Line 56 assumes inspect.signature(method) always succeeds. For some tokenizer backends (custom implementations, C extensions), signature introspection can raise ValueError or TypeError, causing tokenizer loading to fail. Wrap the call in a try-except to gracefully return False on introspection failure, matching the semantics of missing methods.
Proposed fix
def _supports_kwarg(obj: object, method_name: str, kwarg: str) -> bool:
"""Check if a method on an object accepts a specific keyword argument."""
method = getattr(obj, method_name, None)
if method is None:
return False
- return kwarg in inspect.signature(method).parameters
+ try:
+ return kwarg in inspect.signature(method).parameters
+ except (TypeError, ValueError):
+ return False🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/aiperf/common/tokenizer.py` around lines 51 - 56, The helper
_supports_kwarg currently calls inspect.signature(method) without guarding for
introspection failures; update _supports_kwarg to catch exceptions like
ValueError and TypeError raised by inspect.signature and return False in those
cases so non-introspectable tokenizer methods are treated the same as missing
methods (i.e., no kwarg support). Locate the _supports_kwarg function and wrap
the inspect.signature(method).parameters access in a try/except that returns
False on ValueError/TypeError, keeping existing behavior for missing methods.
| f"Kubernetes mode: skipped local client, compressed {self._conversation_count} " | ||
| "conversations into backing store)" |
There was a problem hiding this comment.
Fix the log message typo.
Line 160 has an extra closing parenthesis in the message ("backing store)"), which makes logs look sloppy.
Proposed fix
- f"Kubernetes mode: skipped local client, compressed {self._conversation_count} "
- "conversations into backing store)"
+ f"Kubernetes mode: skipped local client, compressed {self._conversation_count} "
+ "conversations into backing store"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/aiperf/dataset/dataset_manager.py` around lines 159 - 160, The
Kubernetes-mode log message in dataset_manager.py builds an f-string containing
self._conversation_count and currently ends with an extra closing parenthesis in
the literal "backing store)"; update the string used in the logging call (the
f"...Kubernetes mode: skipped local client, compressed
{self._conversation_count} conversations into backing store)") to remove the
stray ')' so the message reads without the extra parenthesis.
| if hash_id not in self._cache: | ||
| # To ensure that the prompt doesn't merge chunks, we insert a BOS or EOS token | ||
| # at the beginning. Length is maintained and the prompt generates the expected | ||
| # number of tokens. If no BOS or EOS token is available, we don't insert one. | ||
| prompt_tokens: list[int] = [] | ||
| if self.tokenizer.block_separation_token_id is not None: | ||
| prompt_tokens += [self.tokenizer.block_separation_token_id] | ||
| prompt_tokens += self._sample_tokens(current_block_size - 1) | ||
| else: | ||
| prompt_tokens += self._sample_tokens(current_block_size) | ||
|
|
||
| self._cache[hash_id] = prompt_tokens # store to cache | ||
| self._hash_id_corpus_rng.reseed_for_hash_id(hash_id) | ||
| self._cache[hash_id] = sample_tokens_from_corpus( | ||
| self._tokenized_corpus, | ||
| current_block_size, | ||
| self._hash_id_corpus_rng, | ||
| self.tokenizer.block_separation_token_id, | ||
| ) | ||
|
|
||
| final_prompt.extend(self._cache[hash_id]) | ||
|
|
There was a problem hiding this comment.
Cache key is underspecified for variable block sizes.
Line 289 caches by hash_id only, but current_block_size can differ (final partial block, or different block_size across calls). This can reuse a block with the wrong length and corrupt prompt sizing/content.
Proposed fix
- if hash_id not in self._cache:
+ cache_key = (hash_id, current_block_size)
+ if cache_key not in self._cache:
self._hash_id_corpus_rng.reseed_for_hash_id(hash_id)
- self._cache[hash_id] = sample_tokens_from_corpus(
+ self._cache[cache_key] = sample_tokens_from_corpus(
self._tokenized_corpus,
current_block_size,
self._hash_id_corpus_rng,
self.tokenizer.block_separation_token_id,
)
-
- final_prompt.extend(self._cache[hash_id])
+ final_prompt.extend(self._cache[cache_key])Also update the cache annotation accordingly:
- self._cache: dict[int, list[int]] = {}
+ self._cache: dict[tuple[int, int], list[int]] = {}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/aiperf/dataset/generator/prompt.py` around lines 289 - 299, The cache key
uses only hash_id which breaks when current_block_size varies; change the
caching to include current_block_size (e.g., use a tuple key like (hash_id,
current_block_size)) so that self._cache stores size-specific samples; update
any annotations for self._cache accordingly, ensure you still call
self._hash_id_corpus_rng.reseed_for_hash_id(hash_id) and populate the cache
using sample_tokens_from_corpus(self._tokenized_corpus, current_block_size,
self._hash_id_corpus_rng, self.tokenizer.block_separation_token_id), and
continue to extend final_prompt with the cached value.
| hash_ids = trace["hash_ids"] | ||
| input_length = trace["input_length"] | ||
| final_block_size = input_length - (len(hash_ids) - 1) * block_size | ||
|
|
||
| tokens: list[int] = [] | ||
| for i, hid in enumerate(hash_ids): | ||
| size = final_block_size if i == len(hash_ids) - 1 else block_size | ||
| tokens.extend(get_block_tokens(hid, size)) | ||
| prompt = decode(tokens, skip_special_tokens=False) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find and examine the MooncakeTrace validation logic
find . -name "mooncake_trace.py" -type fRepository: ai-dynamo/aiperf
Length of output: 105
🏁 Script executed:
# Examine the parallel_convert.py file to understand context around the flagged code
wc -l src/aiperf/dataset/loader/parallel_convert.pyRepository: ai-dynamo/aiperf
Length of output: 109
🏁 Script executed:
# Find the sample_tokens_from_corpus function
rg "def sample_tokens_from_corpus" -A 10Repository: ai-dynamo/aiperf
Length of output: 828
🏁 Script executed:
# Examine the MooncakeTrace validation logic
cat -n src/aiperf/dataset/loader/mooncake_trace.py | head -100Repository: ai-dynamo/aiperf
Length of output: 4373
🏁 Script executed:
# Get the full sample_tokens_from_corpus implementation
rg "def sample_tokens_from_corpus" -A 30 src/aiperf/dataset/generator/prompt.pyRepository: ai-dynamo/aiperf
Length of output: 922
🏁 Script executed:
# Check the context around lines 138-146 in parallel_convert.py (lines 103-160)
sed -n '103,160p' src/aiperf/dataset/loader/parallel_convert.pyRepository: ai-dynamo/aiperf
Length of output: 2295
🏁 Script executed:
# Find the MooncakeTrace model definition
rg "class MooncakeTrace" -A 50 src/aiperf/dataset/loader/models.pyRepository: ai-dynamo/aiperf
Length of output: 2406
🏁 Script executed:
# Also check if there are any validators on MooncakeTrace
rg "class MooncakeTrace" -B 5 -A 80 src/aiperf/dataset/loader/models.py | head -120Repository: ai-dynamo/aiperf
Length of output: 3636
Add validation to ensure final_block_size remains positive.
The calculation final_block_size = input_length - (len(hash_ids) - 1) * block_size can produce a non-positive value if input_length is smaller than the space required by preceding blocks. For example, with input_length=100, 5 hash_ids, and block_size=30, final_block_size becomes −20. This breaks the token sampling logic.
Add a validation check when hash_ids is present to ensure input_length >= (len(hash_ids) - 1) * block_size, or document this as a required invariant for callers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/aiperf/dataset/loader/parallel_convert.py` around lines 138 - 146,
Validate that final_block_size is positive before using it: compute
final_block_size as currently done (in the block handling hash_ids inside
parallel_convert.py), then check that input_length >= (len(hash_ids) - 1) *
block_size (or equivalently final_block_size > 0) and raise a clear ValueError
(or return/skip the trace) if the check fails; reference the variables hash_ids,
input_length, final_block_size and the functions get_block_tokens/decode so
maintainers can locate where to add the validation and the error message
describing the invalid sizes (include values for input_length, len(hash_ids),
and block_size in the error).
| def create_jsonl_file(): | ||
| """Create a temporary JSONL file with custom content.""" | ||
| filenames = [] | ||
|
|
||
| def _create_file(content_lines): | ||
| with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f: | ||
| for line in content_lines: | ||
| f.write(line + "\n") | ||
| filenames.append(f.name) | ||
| return f.name | ||
|
|
||
| yield _create_file | ||
|
|
||
| for fn in filenames: | ||
| Path(fn).unlink(missing_ok=True) | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def default_user_config() -> UserConfig: | ||
| return UserConfig(endpoint=EndpointConfig(model_names=["test-model"])) | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_prompt_generator(): | ||
| """Mock PromptGenerator with required attributes for BaseTraceDatasetLoader.""" | ||
| generator = Mock() | ||
| generator.generate.return_value = "Generated prompt text" | ||
| generator._cache = {} | ||
| generator._tokenized_corpus = list(range(100, 200)) | ||
| generator._hash_id_corpus_rng = Mock() | ||
| generator._hash_id_corpus_rng.seed = 42 | ||
| generator.tokenizer = Mock() | ||
| generator.tokenizer.resolved_name = "test-model" | ||
| generator.tokenizer.block_separation_token_id = None | ||
| return generator |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add explicit type hints to fixtures and test methods.
Several function signatures in this file are untyped (fixtures and test_* methods). Please add parameter/return annotations (-> None for tests) to match repository standards.
Proposed pattern (apply consistently across this file)
+from collections.abc import Callable, Sequence
@@
`@pytest.fixture`
-def create_jsonl_file():
+def create_jsonl_file() -> Callable[[Sequence[str]], str]:
@@
- def _create_file(content_lines):
+ def _create_file(content_lines: Sequence[str]) -> str:
@@
`@pytest.fixture`
-def mock_prompt_generator():
+def mock_prompt_generator() -> Mock:
@@
- def test_hash_is_16_hex_chars(self, create_jsonl_file):
+ def test_hash_is_16_hex_chars(
+ self, create_jsonl_file: Callable[[Sequence[str]], str]
+ ) -> None:As per coding guidelines, "Add type hints on ALL functions (params and return)".
Also applies to: 83-605
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/dataset/loader/test_base_trace_loader.py` around lines 38 - 72,
Add explicit type annotations to all untyped fixtures and test functions in this
file: annotate create_jsonl_file with its return/generator type (e.g.,
Iterator[Callable[[List[str]], str]]), mark default_user_config() -> UserConfig,
mock_prompt_generator() -> Mock (or PromptGenerator protocol), and add -> None
return annotations to all test_* functions and typed parameters where
applicable; update any local variables' types if needed to satisfy type checkers
(references: create_jsonl_file, default_user_config, mock_prompt_generator, and
all test_* functions).
| def test_turn_fields_populated( | ||
| self, create_jsonl_file, mock_prompt_generator, default_user_config | ||
| ): | ||
| """Turn objects should have correct timestamp, delay, max_tokens.""" | ||
| content = [ | ||
| '{"input_length": 100, "hash_ids": [1], "timestamp": 5000, "output_length": 42}' | ||
| ] | ||
| filename = create_jsonl_file(content) | ||
|
|
||
| loader = MooncakeTraceDatasetLoader( | ||
| filename=filename, | ||
| user_config=default_user_config, | ||
| prompt_generator=mock_prompt_generator, | ||
| ) | ||
| data = loader.load_dataset() | ||
| conversations = list(loader.convert_to_conversations(data)) | ||
|
|
||
| turn = conversations[0].turns[0] | ||
| assert turn.timestamp == 5000 | ||
| assert turn.max_tokens == 42 | ||
|
|
There was a problem hiding this comment.
delay is documented but not actually validated.
The test description says delay is verified, but there is no assertion for turn.delay. This leaves a regression gap in turn field mapping.
Suggested test fix
- content = [
- '{"input_length": 100, "hash_ids": [1], "timestamp": 5000, "output_length": 42}'
- ]
+ content = [
+ '{"input_length": 100, "hash_ids": [1], "timestamp": 5000, "delay": 250, "output_length": 42}'
+ ]
@@
turn = conversations[0].turns[0]
assert turn.timestamp == 5000
+ assert turn.delay == 250
assert turn.max_tokens == 42🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/dataset/loader/test_base_trace_loader.py` around lines 422 - 442,
The test test_turn_fields_populated is missing an assertion for the turn.delay
field; update the test to assert that turn.delay equals the expected value
produced by MooncakeTraceDatasetLoader (i.e., the delay mapping implemented in
MooncakeTraceDatasetLoader.convert_to_conversations or loader.load_dataset), for
example add a line like assert turn.delay == <expected_delay> where
<expected_delay> is the loader's computed/default delay for the provided record
(commonly 0 if no delay field is present).
Tip
Full 23k mooncake_trace.jsonl:
Linux i9-14900k: From 16s down to ~6-7s
MacBook Pro M4: From 20s down to 10s
Add HashIdRandomGenerator that deterministically seeds per (trace_id, hash_id) pair, enabling parallel token generation across workers without lock contention or cache coordination. Extract parallel_convert module to leverage multiprocessing with shared-memory token corpus. Stream conversations through composers to the backing store instead of materializing the full dataset in memory.
Summary by CodeRabbit
Release Notes
New Features
Improvements
Removals